﻿/*!
  @author cestdio-studio
  @date 2016-09-12
  */
#include "tasknode.h"
#include <QMutex>
#include <QThread>
#include <QTextStream>
#include <QCoreApplication>
#include <QDir>
#include <QDebug>
#include <QAtomicInteger>
#include "taskproject.h"
#include "tb_interface.h"
#include "process_prctl.h"
#include "../watchdog/tbwatchdog.h"
#include "../watchdog/profile_log.h"

QEvent::Type taskNode::m_nPackEvent = (QEvent::Type)QEvent::registerEventType();
taskNode::taskNode(QObject *parent)
	: QObject(parent)
	,m_process(new QProcess(this))
	,m_nBatchTime(taskCell::default_batchtime)
{
	connect(m_process,&QProcess::started,this,
			&taskNode::sig_pro_started/*,Qt::QueuedConnection*/);
	connect(m_process,&QProcess::started,this,
			&taskNode::slot_started/*,Qt::QueuedConnection*/);
	connect(m_process,
			static_cast<void(QProcess::*)(int, QProcess::ExitStatus)>
			(&QProcess::finished),
			this,&taskNode::sig_pro_stopped/*,Qt::QueuedConnection*/);
	connect(m_process,
			static_cast<void(QProcess::*)(int, QProcess::ExitStatus)>
			(&QProcess::finished),
			this,&taskNode::slot_stopped/*,Qt::QueuedConnection*/);
	connect(m_process,&QProcess::readyReadStandardOutput,this,
			&taskNode::slot_readyReadStandardOutput/*,Qt::QueuedConnection*/);
	connect(m_process,&QProcess::readyReadStandardError,this,
			&taskNode::slot_readyReadStandardError/*,Qt::QueuedConnection*/);
	connect(m_process,&QProcess::bytesWritten,this,
			&taskNode::slot_sended);

	m_array_stdin.resize(m_bufsize_stdin,QByteArray(sizeof(TASKBUS::subject_package_header),'\0'));
	m_size_stdin.resize(m_bufsize_stdin,0);
	m_status_stdin.resize(m_bufsize_stdin,0);
	m_cnt_stdin.resize(m_bufsize_stdin,QAtomicInteger<qsizetype>(0));
	m_nBp_TimerID = startTimer(200);

}
taskNode::~taskNode()
{
	cmd_stop(this);
}
void taskNode::setCurrentProject(taskProject * p)
{
	m_currPrj = p;
}
/*!
 * \brief taskNode::cmd_start 开启进程进行工作。Start the process to work.
 * \param node   信号发送者希望开启的taskNode实例。
 * The Tasknode instance that the signal sender wants to open.
 * \param cmd    命令行 Commandline filename
 * \param paras  参数表 Commandline parameters
 * \return 成功、失败   True when succeed
 */
bool taskNode::cmd_start(QObject * node,QString cm, QStringList paras)
{
	if (node !=this)
		return false;
	m_bCmdStop =false;
	m_nCmdRestart = -1;
	if (m_process->state()!=QProcess::NotRunning)
		return false;
	//In buf zero
	for (qsizetype i=0;i<m_bufsize_stdin;++i)
	{
		m_size_stdin[i] = 0;
		m_cnt_stdin[i] = 0;
		m_status_stdin[i] = 0;
	}
	m_pos_stdin = 0;
	m_write_queue.clear();
	m_write_pos.clear();
	m_write_cmd.clear();
	m_bufsize_adjust = m_bufsize_stdin;

	const QString cmdline =cm;
	QFileInfo cmdinfo(cm);
	QStringList lstCmds = paras;
	m_process->setProgram(cmdinfo.absoluteFilePath());
	m_process->setWorkingDirectory(cmdinfo.absolutePath());
	m_process->setArguments(lstCmds);
	m_process->start();

	if (!cmdinfo.exists())
	{
		emit_message(tr("Error:File %1 does not exist. Please Load the module first.").arg(cm).toUtf8());
	}

	QString cmdlinestr = cmdline;
	foreach (QString cmd, lstCmds)
		cmdlinestr += " " + cmd;

	emit_message(QByteArray(cmdlinestr.toStdString().c_str()));

	m_sbytes_sent = 0;
	m_spackage_sent= 0;
	m_sbytes_recieved = 0;
	m_spackage_recieved = 0;
	return true;
}


bool taskNode::cmd_stop(QObject * node)
{
	using namespace TASKBUS;
	if (node !=this)
		return false;
	m_bCmdStop = true;
	m_nCmdRestart = -1;
	if (m_process->state()!=QProcess::Running)
		return false;
	//发送信令，终止
	char cmd[] = "function=quit;ret=0;source=0;destin=0;class=taskbus;";
	subject_package_header header;
	header.prefix[0] = 0x3C;
	header.prefix[1] = 0x5A;
	header.prefix[2] = 0x7E;
	header.prefix[3] = 0x69;
	header.data_length = static_cast<unsigned int>(strlen(cmd)+1);
	header.path_id = 1;
	header.subject_id = control_subect_id();
	m_process->write((char *)&header,sizeof(subject_package_header));
	m_process->write(cmd,strlen(cmd)+1);
	int c = 0;
	while (++c<100 && m_process->state()==QProcess::Running)
	{
		m_process->waitForFinished(20);
		QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents);
	}
	m_process->kill();

	return true;
}

void taskNode::slot_started()
{
	if (m_bDebug)
	{
		QString strDebugDir = dbgdir();
		m_dbgfile_stderr.close();
		m_dbgfile_stdin.close();
		m_dbgfile_stdout.close();

		m_dbgfile_stderr.setFileName(strDebugDir+"/stderr.txt");
		m_dbgfile_stdin.setFileName(strDebugDir+"/stdin.dat");
		m_dbgfile_stdout.setFileName(strDebugDir+"/stdout.dat");

		if (m_dbgfile_stderr.open(QIODevice::WriteOnly)==true)
		{
			QTextStream st(&m_dbgfile_stderr);
			st<<m_process->arguments().size()+1<<"\n";
			st<<m_process->program();
			foreach (QString a,m_process->arguments())
				st<<"\n"<<a;
			st<<"\n";
			st.flush();
		}
		m_dbgfile_stdout.open(QIODevice::WriteOnly);
		m_dbgfile_stdin.open(QIODevice::WriteOnly);

	}

	if (m_pCell)
	{
		QMap<QString,QVariant> vm
			= m_pCell->additional_paras(m_pCell->function_firstname());
		if (vm.contains("nice"))
		{
			int nic = vm["nice"].toInt();
			TASKBUS::set_proc_nice(m_process,nic);
		}
	}

	tb_watch_dog().watch(m_process);

}

void taskNode::slot_stopped()
{
	if (!m_bCmdStop)
	{
		m_nCmdRestart = 15;
		emit_message(QByteArray("Module Stoppped. Restart in 3 seconds"));
	}
}
/*!
 * \brief taskNode::slot_readyReadStandardOutput
 * 为了后续的处理需要，这里会等到一个完整的包到来再发送。
 * For subsequent processing needs,
 * this will wait until a complete package arrives before sending.
 */
void taskNode::slot_readyReadStandardOutput()
{
	LOG_PROFILE("IO","Start Recieving packs.");
	qsizetype total_sz = m_process->size();
	int badHeader = 0;
	while (total_sz)
	{
		const qsizetype pos = m_pos_stdin % m_bufsize_adjust;
		QByteArray & curr_array = m_array_stdin[pos];
		qsizetype & readBufMax = m_size_stdin[pos];
		QAtomicInteger<qsizetype> & cnt = m_cnt_stdin[pos];
		int & stat = m_status_stdin[pos];
		if (cnt>0)
			break;
		//Old data
		if (stat==3)
		{
			readBufMax = 0;
			stat = 0;
		}
		auto * header =	reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());
		//Header
		if (stat==0)
		{
			if (total_sz<sizeof(TASKBUS::subject_package_header))
				break;
			auto needRead = sizeof(TASKBUS::subject_package_header) - readBufMax ;
			auto red = m_process->read(curr_array.data()+readBufMax,needRead);
			readBufMax += red;
			if (readBufMax == sizeof(TASKBUS::subject_package_header))
			{
				if (header->prefix[0]==0x3C && header->prefix[1]==0x5A &&	header->prefix[2]==0x7E && header->prefix[3]==0x69)
				{
					stat = 1;
				}
				else
				{
					++badHeader;
					readBufMax = 0;
				}
			}
			Q_ASSERT(readBufMax <= sizeof(TASKBUS::subject_package_header));
		}
		//data
		if (stat==1)
		{
			const qsizetype packAllSize = sizeof(TASKBUS::subject_package_header)+header->data_length;
			if (curr_array.size()<packAllSize)
			{
				curr_array.resize(packAllSize);
				header = reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());
			}
			auto needRead = packAllSize - readBufMax ;
			auto red = m_process->read(curr_array.data()+readBufMax,needRead);
			readBufMax += red;
			if (readBufMax==packAllSize)
			{
				stat = 2;
			}
			Q_ASSERT(readBufMax <= packAllSize);
		}

		//Send
		if (stat==2)
		{
			const qsizetype pack_size = sizeof(TASKBUS::subject_package_header)+header->data_length;
			extern QAtomicInteger<quint64>  g_totalrev;
			g_totalrev += readBufMax;
			++m_spackage_sent;
			m_sbytes_sent += sizeof(TASKBUS::subject_package_header)+header->data_length;
			if (header->subject_id == TB_SUBJECT_CMD)
			{
				//Command must endwith \0
				const char * pCmd = (const char *)header+sizeof(TASKBUS::subject_package_header);
				QString cmd = QString::fromUtf8(pCmd,header->data_length);
				QMap<QString, QVariant> map_z = taskCell::string_to_map(cmd);
				//remember uuid
				if (map_z.contains("source"))
				{
					if(m_uuid.size()==0 )
						m_uuid = map_z["source"].toString();
					if (map_z.contains("destin"))
						emit sig_new_command(map_z);
				}
			}
			else if (m_currPrj)
				m_currPrj->routing_new_package(this,pos);
			if (m_bDebug)
				log_package(true,(char *)header,pack_size);
			stat = 3;
			++m_pos_stdin;
			//Adjust buf size
			if (m_pos_stdin==8)
			{
				qsizetype sza = 0;
				for (int ia = 0;ia<8;++ia)
					if (sza < m_size_stdin[ia])
						sza = m_size_stdin[ia];
				if (sza < 32)
					sza = 32;
				m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;
				if (m_bufsize_adjust > m_bufsize_stdin)
					m_bufsize_adjust = m_bufsize_stdin;
				if (m_bufsize_adjust < 8)
					m_bufsize_adjust = 8;
				emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());
			}
		}
		total_sz = m_process->size();
	}

	if (badHeader)
		emit_message(QByteArray("Error header recieved. "
								"Header must be 0x3C, 0x5A, 0x7E,"
								" 0x69. Aborting."));
	LOG_PROFILE("IO","End Recieving packs.");
}

/*!
 * \brief taskNode::emit_message
 * Signals and Slots is easy to use, but it will take too much cpu resource with out bufferring .
 * 没有缓存机制，频繁的发送信号，将降低系统的性能。
 * We will check the timestamp between 2 signals, catch it , and send it out as a batch array.
 * \param arred single message.
 */
void taskNode::emit_message(QByteArray arred)
{
	static clock_t last_ck = clock();
	//Prevent of too short freq.
	clock_t curr_ck = clock();
	bool keep = false;
	if (m_nBatchTime>0)
	{
		long long delta = curr_ck-last_ck;
		delta *=1000;
		delta /= CLOCKS_PER_SEC;
		if (delta <=m_nBatchTime && curr_ck-last_ck>=0)
			keep = true;
	}
	m_arr_Strerr.push_back(arred);
	if (false==keep)
	{
		flush_from_stderr();
		last_ck = curr_ck;
	}
}

void taskNode::flush_from_stderr()
{
	if (m_arr_Strerr.size())
	{
		emit sig_new_errmsg(m_arr_Strerr);
		m_arr_Strerr.clear();
	}
}

void taskNode::timerEvent(QTimerEvent *event)
{
	if (event->timerId()==m_nBp_TimerID)
	{
		static int ct = 0;
		if (++ct % 3 ==0)
		{
			if (isRunning())
			{
				qint64 pid = TASKBUS::get_procid(m_process);
				emit sig_iostat(pid,m_spackage_recieved,m_spackage_sent,m_sbytes_recieved,	m_sbytes_sent);
			}
		}
		flush_from_stderr();
		if (m_nCmdRestart>=0)
		{
			--m_nCmdRestart;
			if (m_nCmdRestart==-1 && m_bCmdStop==false)
			{
				m_sbytes_sent = 0;
				m_spackage_sent= 0;
				m_sbytes_recieved = 0;
				m_spackage_recieved = 0;
				if (!isRunning())
					m_process->start();
			}
		}

	}
}




/*!
 * \brief taskNode::log_package 记录调试信息 Logging Debug Information
 * \param fromStdOut 是从进程的stdout来的信息 Is the information from the
 *                   stdout of the process
 * \param arrPackage 包
 */
void taskNode::log_package(bool fromStdOut,char * pkg, qsizetype sz)
{
	if (m_bDebug)
	{
		if (fromStdOut && m_dbgfile_stdout.isOpen())
		{
			m_dbgfile_stdout.write(pkg,sz);
			m_dbgfile_stdout.flush();
		}
		else if (fromStdOut==false && m_dbgfile_stdin.isOpen())
		{
			m_dbgfile_stdin.write(pkg,sz);
			m_dbgfile_stdin.flush();

		}
	}

}

void taskNode::slot_readyReadStandardError()
{
	QByteArray arred =m_process->readAllStandardError();
	extern QAtomicInteger<quint64> g_totalrev;
	g_totalrev += arred.size();

	m_arrStdErr.append(arred);
	const int idxR = m_arrStdErr.lastIndexOf('\n');
	if (idxR>=0)
	{
		emit_message(m_arrStdErr.left(idxR));
		m_arrStdErr = m_arrStdErr.mid(idxR+1);
	}
	else if (m_arrStdErr.size()>=4096)
	{
		emit_message(m_arrStdErr);
		m_arrStdErr.clear();
	}



	if (m_bDebug && m_dbgfile_stderr.isOpen())
	{
		QTextStream st(&m_dbgfile_stderr);
		st<<arred;
		st.flush();
	}

}
bool taskNode::enqueue_write(taskNode * node, qsizetype pos)
{
	m_mtx_queue.lock();
	int z = m_write_queue.size() + m_write_cmd.size();
	m_write_queue.push_back( node);
	m_write_pos.push_back(pos);
	m_mtx_queue.unlock();
	++m_spackage_recieved;
	if (!z)
	{
		QCoreApplication::postEvent(this,new QEvent(m_nPackEvent));
	}
	return  true;
}

void taskNode::customEvent(QEvent * evt)
{
	if (evt->type()==m_nPackEvent)
	{
		flush_write();
	}
}
void taskNode::flush_write()
{
	m_mtx_queue.lock();
	QList<taskNode *> write_queue = m_write_queue;
	QList<qsizetype> write_pos = m_write_pos;
	QByteArrayList write_cmd = m_write_cmd;
	m_write_queue.clear();
	m_write_pos.clear();
	m_write_cmd.clear();
	//qDebug()<<write_queue.size();
	m_mtx_queue.unlock();

	while (write_queue.size())
	{
		taskNode * node = write_queue.first();
		qsizetype pos = write_pos.first();

		write_queue.pop_front();
		write_pos.pop_front();

		QByteArray & arr = node->get_stdin_array(pos);
		qsizetype & sz = node->get_stdin_size(pos);
		QAtomicInteger<qsizetype> & cnt = node->get_stdin_cnt(pos);
		auto * header =	reinterpret_cast<const TASKBUS::subject_package_header *>  (arr.data());

		Q_ASSERT(sz <= arr.size() && header->data_length+sizeof(TASKBUS::subject_package_header)==sz );
		Q_ASSERT(cnt>0);
		if (m_bDebug)
			log_package(false,arr.data(),sz);
		m_sbytes_recieved += sz;
		if (m_process->state()==QProcess::Running)
			m_process->write(arr.constData(),sz);
		--cnt;
	}
	while (write_cmd.size())
	{
		if (m_process->state()==QProcess::Running)
			m_process->write(write_cmd.first());
		write_cmd.pop_front();
	}
}
bool taskNode::cmd_sendcmd(QMap<QString,QVariant> cmd, QSet<QString> destins)
{
	if (destins.contains(m_uuid)==false
		&&destins.contains("0")==false)
		return false;
	if (cmd.contains("source"))
		if (cmd["source"]==m_uuid)
			return false;
	QString strv = taskCell::map_to_string(cmd);
	QByteArray utf8 = strv.toUtf8();
	QByteArray arr;
	arr.append(0x3C);	arr.append(0x5A);
	arr.append(0x7E);	arr.append(0x69);
	arr.append(4,0xFFu); arr.append(4,0x00u);
	utf8.append('\0');
	const int sz = utf8.size();
	arr.append((sz>> 0) & 0xff);
	arr.append((sz>> 8) & 0xff);
	arr.append((sz>> 16) & 0xff);
	arr.append((sz>> 24) & 0xff);
	arr.append(utf8);
	m_mtx_queue.lock();
	int z = m_write_queue.size() + m_write_cmd.size();
	m_write_cmd	.push_back(arr);
	m_mtx_queue.unlock();
	if (!z)
	{
		QCoreApplication::postEvent(this,new QEvent(m_nPackEvent),Qt::HighEventPriority);
	}
	return true;
}

void taskNode::slot_sended(qint64 b)
{
	extern QAtomicInteger<quint64> g_totalsent ;
	g_totalsent += b;
}
bool taskNode::isRunning ()
{
	return m_process->state()==QProcess::NotRunning?false:true;
}

QString taskNode::dbgdir()
{
	QString str = QCoreApplication::applicationDirPath() + "/debug";
	if (!m_process)
		return str;
	str +=  QString("/pid%1").arg(m_process->processId());
	QDir dir;
	dir.mkpath(str);
	return str;
}
